In [1]:
%run startup.py
In [2]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')
source: http://reactivex.io/documentation/operators.html#tree.
(transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, axiros)
This tree can help you find the ReactiveX Observable operator you’re looking for.
See Part 1 for Usage and Output Instructions.
We also require acquaintance with the marble diagrams feature of RxPy.
In [3]:
rst(O.to_iterable)
s = marble_stream("a--b-c|")
l, ts = [], time.time()
def on_next(listed):
print('got', listed, time.time()-ts)
for i in (1, 2):
d = s.subscribe(on_next)
# second run: only one value, the list.
s = s.to_list()
# both are started around same time -> check time deltas
In [4]:
rst(O.to_blocking)
ts = time.time()
s = O.interval(200).take(3)
sb = s.to_blocking()
# this is instant:
assert time.time() - ts < 0.2
print('''In some implementations of ReactiveX, there is also an operator that converts an Observable into a “Blocking” Observable. A Blocking Observable extends the ordinary Observable by providing a set of methods, operating on the items emitted by the Observable, that block. Some of the To operators are in this Blocking Obsevable set of extended operations.''')
# -> diffing dir(s) with dir(sb) we get:
# __iter__
# for_each
# observable
rst(sb.__iter__)
for i in (1, 2):
# not interleaved results:
for it in sb:
log(it)
rst(sb.for_each)
sb.for_each(log)
header(".observable -> getting async again")
# interleaved again:
d = subs(sb.observable, name='observer 1')
d = subs(sb.observable, name='observer 2')
In [5]:
rst(O.to_dict)
d = subs(O.from_('abc').to_dict(key_mapper=lambda x: x, element_mapper=lambda a: '%s%s' % (a, a)))
In [29]:
rst(O.to_future)
def emit(obs):
for ev in 'first', 'second':
sleep(.5)
log('emitting', ev)
obs.on_next(ev)
# vital for the future to get done:
obs.on_completed()
try:
# required for py2 (backport of guidos' tulip stuffs, now asyncio)
# caution: people say this is not production ready and will never be.
import trollius
f = rx.Observable.create(emit).to_future(trollius.Future)
# this is async, not a busy loop
log('future.result():', f.result())
except: # notebook should always run all cells
print ('skipping this; pip install trollius required')
In [34]:
rst(O.from_marbles)
d = subs(rx.Observable.from_string("1-(42)-3-|").to_blocking())
In [40]:
rst(O.to_set)
d = subs(O.from_("abcabc").to_set())
Advanced feature: Adding side effects to subscription and unsubscription events.
This is a good read:
Plus see the other links on RX docu
In [54]:
rst(O.subscribe_on)
# start simple:
header('Switching Schedulers')
s = O.just(42, rx.scheduler.ImmediateScheduler())
d = subs(s.subscribe_on(rx.scheduler.TimeoutScheduler()), name='SimpleSubs')
sleep(0.1)
header('Custom Subscription Side Effects')
from rx.scheduler.newthreadscheduler import NewThreadScheduler
from rx.scheduler.eventloopscheduler import EventLoopScheduler
class MySched(NewThreadScheduler):
'''For adding side effects at subscription and unsubscription time'''
def schedule(self, action, state=None):
log('new scheduling task', action)
scheduler = EventLoopScheduler(
thread_factory=self.thread_factory,
exit_if_empty=True)
return scheduler.schedule(action, state)
s = O.interval(200).take(2)
s = s.subscribe_on(MySched())
d = subs(s, name="subs1")
d = subs(s, name="subs2")
Via this you can add side effects on any notification to any subscriber.
This example shall demonstrate whats going on:
In [44]:
rst(O.observe_on)
from rx.scheduler.newthreadscheduler import NewThreadScheduler
header('defining a custom thread factory for a custom scheduler')
def my_thread_factory(target, args=None):
'just to show that also here we can customize'
t = threading.Thread(target=target, args=args or [])
t.setDaemon(True)
print ('\ncreated %s\n' % t.getName())
return t
class MySched:
def __init__(self):
self.rx_sched = NewThreadScheduler(my_thread_factory)
def __getattr__(self, a):
'called whenever the observe_on scheduler is on duty'
log('RX called', a, 'on mysched\n')
return getattr(self.rx_sched, a)
mysched = MySched()
s = O.interval(200).take(3) #.delay(100, mysched)
d = subs(s.observe_on(mysched))
sleep(2)
print 'all threads after finish:' # all cleaned up
print (' '.join([t.name for t in threading.enumerate()]))
In [66]:
rst(O.do_action)
def say(v=None):
if v:
log('NI!', v)
else:
log('EOF')
d = subs(O.range(10, 10).take(2).tap(say, on_completed=say))
In [72]:
rst(O.finally_action)
d = subs(O.on_error('err').take(2).finally_action(say))
In [76]:
rst(O.throw)
d = subs(O.range(1, 3).concat(O.on_error("ups")))
In [85]:
rst(O.timeout)
d = subs(marble_stream("a-b---c|").timeout(200, O.just('timeout')))
# this also works with absolute time. See docstring:
In [11]:
rst(O.timeout_with_selector)
d = subs(marble_stream("2-2-1-1|")\
.timeout_with_selector(
# you get the value and can adjust the timeout accordingly:
timeout_duration_mapper=lambda x: O.timer(100 * int(x)),
other=O.just('timeout')))
(example: see above)
In [20]:
rst(O.catch_exception)
fubar1 = O.on_error('Ups')
fubar2 = O.on_error('Argh')
good = O.just(42)
d = subs(O.catch(fubar1, fubar2, good))
rst(O.on_error_resume_next)
bucket = [0]
def emitter(obs):
v = bucket[-1]
bucket.append(v)
for i in range(0, len(bucket) + 2):
obs.on_next(i)
if len(bucket) > 2:
log('notify error')
obs.on_error("ups")
log('notify complete')
obs.on_completed()
d = subs(O.on_error_resume_next(O.just('running'),
O.create(emitter),
O.create(emitter),
O.just('all good')
))
In [25]:
rst(O.retry)
ts = time.time()
def emit(obs):
dt = time.time() - ts
obs.on_next('try %s' % dt)
if dt < 1:
sleep(0.2)
log('error')
obs.on_error('ups')
obs.on_completed()
d = subs(O.create(emit).retry(10))
http://www.introtorx.com/Content/v1.0.10621.0/11_AdvancedErrorHandling.html#Using:
The Using factory method allows you to bind the lifetime of a resource to the lifetime of an observable sequence. The signature itself takes two factory methods; one to provide the resource and one to provide the sequence. This allows everything to be lazily evaluated.
This mechanism can find varied practical applications in the hands of an imaginative developer. The resource being an IDisposable is convenient; indeed, it makes it so that many types of resources can be bound to, such as other subscriptions, stream reader/writers, database connections, user controls and, with Disposable(Action), virtually anything else.
In [13]:
rst(O.using)
#d = subs(O.interval(1000).take(2))
lifetime = 2000
def result(disposable_resource_fac):
return O.just(disposable_resource_fac).delay(lifetime)
d2 = subs(O.using(lambda: subs(O.interval(100).take(1000), name='resource fac\n'),
result), name='outer stream\n')
In [6]:
rst(O.start)
def starter():
# called only once, async:
return 'start: ', time.time()
s = O.start(starter).concat(O.from_('abc'))
d = subs(s, name='sub1')
d = subs(s, name='sub2')
In [23]:
rst(O.start_async)
def emit(obs):
for ev in 'first', 'second':
sleep(.2)
log('emitting', ev)
obs.on_next(ev)
# vital for the future to get done:
obs.on_completed()
def future():
# only called once:
log('called future')
future = trollius.Future()
future.set_result(('42', time.time()))
future.set_exception(Exception('ups'))
return future
try:
# required for py2 (backport of guidos' tulip stuffs, now asyncio)
# caution: people say this is not production ready and will never be.
import trollius
s = O.start_async(future)
d = subs(s, name='subs1')
# same result:
d = subs(s, name='subs2')
except Exception as ex: # notebook should always run all cells
print ('%s skipping this; pip install trollius required' % ex)
In [25]:
rst(O.to_async)
d = subs(O.to_async(lambda x, y: x + y)(4, 3) )
In [ ]: